package com.flink.examples.elasticsearch;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Map;
/**
 * @Description 从elasticsearch中获取数据并输出到DataStream数据流中
 * @Author JL
 * @Date 2020/09/18
 * @Version V1.0
 */
public class DataStreamSource {
    /**
     * 官方文档：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
            private RestClientBuilder builder = null;
            //job开始执行，调用此方法创建数据源连接对象,该方法主要用于打开连接
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http"));
            }
            //执行查询并对数据进行封装
            @Override
            public void run(SourceContext<TUser> ctx) throws Exception {
                Gson gson = new Gson();
                RestHighLevelClient client = null;
                //匹配查询
                SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
                sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
                //定义索引库
                SearchRequest request = new SearchRequest();
                request.types("doc");
                request.indices("flink_demo");
                request.source(sourceBuilder);
                try {
                    client = new RestHighLevelClient(builder);
                    SearchResponse response = client.search(request, new Header[]{});
                    SearchHits hits = response.getHits();
                    System.out.println("查询结果有" + hits.getTotalHits() + "条");
                    for (SearchHit searchHits : hits ) {
                        Map<String,Object> dataMap = searchHits.getSourceAsMap();
                        TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
                        ctx.collect(user);
                    }
                    //ID查询
//                    GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
//                    client = new RestHighLevelClient(builder);
//                    GetResponse getResponse = client.get(request, new Header[]{});
//                    Map<String,Object> dataMap = getResponse.getSourceAsMap();
//                    TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
//                    ctx.collect(user);
                }catch(IOException ioe){
                    ioe.printStackTrace();
                }finally {
                    if (client != null){
                        client.close();
                    }
                }
            }
            //Job结束时调用
            @Override
            public void cancel() {
                try {
                    super.close();
                } catch (Exception e) {
                }
                builder = null;
            }
        });
        dataStream.print();
        env.execute("flink es to data job");
    }

}
